Upgrade futures-util to 0.3.30 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I4d01cd43e35cccd70ff58f1669ac3697035e8c60 
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index dde4c7f..1833f75 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json 
@@ -1,6 +1,6 @@  {  "git": { - "sha1": "5e3693a350f96244151081d2c030208cd15f9572" + "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26"  },  "path_in_vcs": "futures-util"  } \ No newline at end of file 
diff --git a/Android.bp b/Android.bp index 60e4e8e..a38886e 100644 --- a/Android.bp +++ b/Android.bp 
@@ -42,7 +42,7 @@  host_supported: true,  crate_name: "futures_util",  cargo_env_compat: true, - cargo_pkg_version: "0.3.26", + cargo_pkg_version: "0.3.30",  srcs: ["src/lib.rs"],  test_suites: ["general-tests"],  auto_gen_config: true, @@ -86,7 +86,7 @@  host_supported: true,  crate_name: "futures_util",  cargo_env_compat: true, - cargo_pkg_version: "0.3.26", + cargo_pkg_version: "0.3.30",  srcs: ["src/lib.rs"],  edition: "2018",  features: [ 
diff --git a/Cargo.toml b/Cargo.toml index 47e9f55..c95816a 100644 --- a/Cargo.toml +++ b/Cargo.toml 
@@ -11,9 +11,9 @@    [package]  edition = "2018" -rust-version = "1.45" +rust-version = "1.56"  name = "futures-util" -version = "0.3.26" +version = "0.3.30"  description = """  Common utilities and extension traits for the futures-rs library.  """ @@ -30,33 +30,33 @@  ]    [dependencies.futures-channel] -version = "0.3.26" +version = "0.3.30"  features = ["std"]  optional = true  default-features = false    [dependencies.futures-core] -version = "0.3.26" +version = "0.3.30"  default-features = false    [dependencies.futures-io] -version = "0.3.26" +version = "0.3.30"  features = ["std"]  optional = true  default-features = false    [dependencies.futures-macro] -version = "=0.3.26" +version = "=0.3.30"  optional = true  default-features = false    [dependencies.futures-sink] -version = "0.3.26" +version = "0.3.30"  optional = true  default-features = false    [dependencies.futures-task] -version = "0.3.26" +version = "0.3.30"  default-features = false    [dependencies.futures_01] 
diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 95c3dee..dcdbce4 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig 
@@ -1,8 +1,8 @@  [package]  name = "futures-util" -version = "0.3.26" +version = "0.3.30"  edition = "2018" -rust-version = "1.45" +rust-version = "1.56"  license = "MIT OR Apache-2.0"  repository = "https://github.com/rust-lang/futures-rs"  homepage = "https://rust-lang.github.io/futures-rs" @@ -35,12 +35,12 @@  cfg-target-has-atomic = []    [dependencies] -futures-core = { path = "../futures-core", version = "0.3.26", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.26", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.26", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.26", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.30", default-features = false, optional = true }  slab = { version = "0.4.2", optional = true }  memchr = { version = "2.2", optional = true }  futures_01 = { version = "0.1.25", optional = true, package = "futures" } 
diff --git a/METADATA b/METADATA index 93b66be..8ba8c32 100644 --- a/METADATA +++ b/METADATA 
@@ -1,23 +1,20 @@  # This project was upgraded with external_updater. -# Usage: tools/external_updater/updater.sh update rust/crates/futures-util -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md    name: "futures-util"  description: "Common utilities and extension traits for the futures-rs library."  third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/futures-util" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.26.crate" - } - version: "0.3.26"  license_type: NOTICE  last_upgrade_date { - year: 2023 + year: 2024  month: 2 - day: 15 + day: 1 + } + homepage: "https://crates.io/crates/futures-util" + identifier { + type: "Archive" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.30.crate" + version: "0.3.30"  }  } 
diff --git a/README.md b/README.md index 6e0aaed..60e2c21 100644 --- a/README.md +++ b/README.md 
@@ -11,7 +11,7 @@  futures-util = "0.3"  ```   -The current `futures-util` requires Rust 1.45 or later. +The current `futures-util` requires Rust 1.56 or later.    ## License   
diff --git a/benches/bilock.rs b/benches/bilock.rs new file mode 100644 index 0000000..013f335 --- /dev/null +++ b/benches/bilock.rs 
@@ -0,0 +1,68 @@ +#![feature(test)] +#![cfg(feature = "bilock")] + +extern crate test; + +use futures::task::Poll; +use futures_test::task::noop_context; +use futures_util::lock::BiLock; + +use crate::test::Bencher; + +#[bench] +fn contended(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_lock(&mut context) { + Poll::Pending => (), + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }); +} + +#[bench] +fn lock_unlock(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }) +} 
diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs index 64d5f9a..517b281 100644 --- a/benches/flatten_unordered.rs +++ b/benches/flatten_unordered.rs 
@@ -5,9 +5,10 @@    use futures::channel::oneshot;  use futures::executor::block_on; -use futures::future::{self, FutureExt}; +use futures::future;  use futures::stream::{self, StreamExt};  use futures::task::Poll; +use futures_util::FutureExt;  use std::collections::VecDeque;  use std::thread;   @@ -34,18 +35,9 @@  }  });   - let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { - async { - if let Some(next) = vals.next() { - let val = next.await.unwrap(); - Some((val, vals)) - } else { - None - } - } - .boxed() - }) - .flatten_unordered(None); + let mut flatten = stream::iter(rxs) + .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten()) + .flatten_unordered(None);    block_on(future::poll_fn(move |cx| {  let mut count = 0; 
diff --git a/benches_disabled/bilock.rs b/benches_disabled/bilock.rs deleted file mode 100644 index 417f75d..0000000 --- a/benches_disabled/bilock.rs +++ /dev/null 
@@ -1,122 +0,0 @@ -#![feature(test)] - -#[cfg(feature = "bilock")] -mod bench { - use futures::executor::LocalPool; - use futures::task::{Context, Waker}; - use futures_util::lock::BiLock; - use futures_util::lock::BiLockAcquire; - use futures_util::lock::BiLockAcquired; - use futures_util::task::ArcWake; - - use std::sync::Arc; - use test::Bencher; - - fn notify_noop() -> Waker { - struct Noop; - - impl ArcWake for Noop { - fn wake(_: &Arc<Self>) {} - } - - ArcWake::into_waker(Arc::new(Noop)) - } - - /// Pseudo-stream which simply calls `lock.poll()` on `poll` - struct LockStream { - lock: BiLockAcquire<u32>, - } - - impl LockStream { - fn new(lock: BiLock<u32>) -> Self { - Self { lock: lock.lock() } - } - - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired<u32>) { - self.lock = guard.unlock().lock() - } - } - - impl Stream for LockStream { - type Item = BiLockAcquired<u32>; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } - } - - #[bench] - fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); - } - - #[bench] - fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) - } -} 
diff --git a/build.rs b/build.rs deleted file mode 100644 index 05e0496..0000000 --- a/build.rs +++ /dev/null 
@@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} 
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs deleted file mode 100644 index 16ec628..0000000 --- a/no_atomic_cas.rs +++ /dev/null 
@@ -1,17 +0,0 @@ -// This file is @generated by no_atomic_cas.sh. -// It is not intended for manual editing. - -const NO_ATOMIC_CAS: &[&str] = &[ - "armv4t-none-eabi", - "armv5te-none-eabi", - "avr-unknown-gnu-atmega328", - "bpfeb-unknown-none", - "bpfel-unknown-none", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32im-unknown-none-elf", - "riscv32imc-unknown-none-elf", - "thumbv4t-none-eabi", - "thumbv5te-none-eabi", - "thumbv6m-none-eabi", -]; 
diff --git a/src/abortable.rs b/src/abortable.rs index e0afd47..9dbcfc2 100644 --- a/src/abortable.rs +++ b/src/abortable.rs 
@@ -78,6 +78,17 @@  pub(crate) inner: Arc<AbortInner>,  }   +impl AbortRegistration { + /// Create an [`AbortHandle`] from the given [`AbortRegistration`]. + /// + /// The created [`AbortHandle`] is functionally the same as any other + /// [`AbortHandle`]s that are associated with the same [`AbortRegistration`], + /// such as the one created by [`AbortHandle::new_pair`]. + pub fn handle(&self) -> AbortHandle { + AbortHandle { inner: self.inner.clone() } + } +} +  /// A handle to an `Abortable` task.  #[derive(Debug, Clone)]  pub struct AbortHandle { @@ -182,4 +193,17 @@  self.inner.aborted.store(true, Ordering::Relaxed);  self.inner.waker.wake();  } + + /// Checks whether [`AbortHandle::abort`] was *called* on any associated + /// [`AbortHandle`]s, which includes all the [`AbortHandle`]s linked with + /// the same [`AbortRegistration`]. This means that it will return `true` + /// even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + /// + /// This operation has a Relaxed ordering. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + }  } 
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs index 7276da2..7e3f12c 100644 --- a/src/async_await/mod.rs +++ b/src/async_await/mod.rs 
@@ -31,9 +31,11 @@  pub use self::select_mod::*;    // Primary export is a macro +#[cfg(feature = "std")]  #[cfg(feature = "async-await-macro")]  mod stream_select_mod;  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 +#[cfg(feature = "std")]  #[cfg(feature = "async-await-macro")]  pub use self::stream_select_mod::*;   
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs index 1c8002f..61e3fa1 100644 --- a/src/async_await/stream_select_mod.rs +++ b/src/async_await/stream_select_mod.rs 
@@ -1,6 +1,5 @@  //! The `stream_select` macro.   -#[cfg(feature = "std")]  #[allow(unreachable_pub)]  #[doc(hidden)]  pub use futures_macro::stream_select_internal; @@ -28,7 +27,6 @@  /// }  /// # });  /// ``` -#[cfg(feature = "std")]  #[macro_export]  macro_rules! stream_select {  ($($tokens:tt)*) => {{ 
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index 597aec1..2257906 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs 
@@ -1,6 +1,5 @@  use core::pin::Pin;  use futures_core::future::{FusedFuture, Future}; -use futures_core::ready;  use futures_core::task::{Context, Poll};  use pin_project_lite::pin_project;   @@ -81,13 +80,12 @@  type Output = Fut::Output;    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { - Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { - Some(fut) => { - let output = ready!(fut.poll(cx)); + match self.as_mut().project().inner.as_pin_mut() { + Some(fut) => fut.poll(cx).map(|output| {  self.project().inner.set(None);  output - } - None => return Poll::Pending, - }) + }), + None => Poll::Pending, + }  }  } 
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index c11d108..955af37 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs 
@@ -463,10 +463,6 @@  /// ```  ///  /// ``` - /// // Note, unlike most examples this is written in the context of a - /// // synchronous function to better illustrate the cross-thread aspect of - /// // the `shared` combinator. - ///  /// # futures::executor::block_on(async {  /// use futures::future::FutureExt;  /// use futures::executor::block_on; 
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index ecd1b42..9ab3b4f 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs 
@@ -37,10 +37,6 @@  }  }   -// The future itself is polled behind the `Arc`, so it won't be moved -// when `Shared` is moved. -impl<Fut: Future> Unpin for Shared<Fut> {} -  impl<Fut: Future> fmt::Debug for Shared<Fut> {  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {  f.debug_struct("Shared") 
diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 7dc159b..79eee8d 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs 
@@ -12,7 +12,7 @@    use super::{assert_future, MaybeDone};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  use crate::stream::{Collect, FuturesOrdered, StreamExt};    pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { @@ -31,7 +31,7 @@  kind: JoinAllKind<F>,  }   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  pub(crate) const SMALL: usize = 30;    enum JoinAllKind<F> @@ -41,7 +41,7 @@  Small {  elems: Pin<Box<[MaybeDone<F>]>>,  }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  Big {  fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,  }, @@ -57,7 +57,7 @@  JoinAllKind::Small { ref elems } => {  f.debug_struct("JoinAll").field("elems", elems).finish()  } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),  }  } @@ -77,7 +77,7 @@  ///  /// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance  /// reasons if the number of futures is large. You may want to look into using it or -/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// its counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.  ///  /// Some examples for additional functionality provided by these are:  /// @@ -106,7 +106,8 @@  {  let iter = iter.into_iter();   - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))]  {  let kind =  JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() }; @@ -114,7 +115,7 @@  assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })  }   - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  {  let kind = match iter.size_hint().1 {  Some(max) if max <= SMALL => JoinAllKind::Small { @@ -153,7 +154,7 @@  Poll::Pending  }  } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),  }  } 
diff --git a/src/future/mod.rs b/src/future/mod.rs index 374e365..2d8fa4f 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs 
@@ -111,13 +111,13 @@  mod either;  pub use self::either::Either;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use abortable::abortable;   
diff --git a/src/future/select.rs b/src/future/select.rs index e693a30..7e33d19 100644 --- a/src/future/select.rs +++ b/src/future/select.rs 
@@ -99,17 +99,26 @@  type Output = Either<(A::Output, B), (B::Output, A)>;    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where + /// `Option::unwrap` does not. + #[inline(always)] + fn unwrap_option<T>(value: Option<T>) -> T { + match value { + None => unreachable!(), + Some(value) => value, + } + } + + let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");    if let Poll::Ready(val) = a.poll_unpin(cx) { - return Poll::Ready(Either::Left((val, b))); + return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1)));  }    if let Poll::Ready(val) = b.poll_unpin(cx) { - return Poll::Ready(Either::Right((val, a))); + return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0)));  }   - self.inner = Some((a, b));  Poll::Pending  }  } 
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 506f450..2d6a2a0 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs 
@@ -12,7 +12,7 @@    use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};  use crate::TryFutureExt;   @@ -38,7 +38,7 @@  Small {  elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,  }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  Big {  fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,  }, @@ -56,7 +56,7 @@  TryJoinAllKind::Small { ref elems } => {  f.debug_struct("TryJoinAll").field("elems", elems).finish()  } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),  }  } @@ -121,7 +121,8 @@  {  let iter = iter.into_iter().map(TryFutureExt::into_future);   - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))]  {  let kind = TryJoinAllKind::Small {  elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), @@ -132,7 +133,7 @@  )  }   - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  {  let kind = match iter.size_hint().1 {  Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { @@ -184,7 +185,7 @@  }  }  } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),  }  } 
diff --git a/src/future/try_select.rs b/src/future/try_select.rs index 4d0b7ff..bc282f7 100644 --- a/src/future/try_select.rs +++ b/src/future/try_select.rs 
@@ -12,6 +12,9 @@    impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}   +type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>; +type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>; +  /// Waits for either one of two differently-typed futures to complete.  ///  /// This function will return a new future which awaits for either one of both @@ -52,10 +55,9 @@  A: TryFuture + Unpin,  B: TryFuture + Unpin,  { - super::assert_future::< - Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, - _, - >(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect { + inner: Some((future1, future2)), + })  }    impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> @@ -63,8 +65,7 @@  A: TryFuture,  B: TryFuture,  { - #[allow(clippy::type_complexity)] - type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; + type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {  let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); 
diff --git a/src/io/copy_buf_abortable.rs b/src/io/copy_buf_abortable.rs index fdbc4a5..ed22d62 100644 --- a/src/io/copy_buf_abortable.rs +++ b/src/io/copy_buf_abortable.rs 
@@ -57,7 +57,7 @@  }    pin_project! { - /// Future for the [`copy_buf()`] function. + /// Future for the [`copy_buf_abortable()`] function.  #[derive(Debug)]  #[must_use = "futures do nothing unless you `.await` or poll them"]  pub struct CopyBufAbortable<'a, R, W: ?Sized> { 
diff --git a/src/io/cursor.rs b/src/io/cursor.rs index b6fb372..c6e2aee 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs 
@@ -1,6 +1,4 @@  use futures_core::task::{Context, Poll}; -#[cfg(feature = "read_initializer")] -use futures_io::Initializer;  use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};  use std::io;  use std::pin::Pin; @@ -159,12 +157,6 @@  }    impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> { - #[cfg(feature = "read_initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - io::Read::initializer(&self.inner) - } -  fn poll_read(  mut self: Pin<&mut Self>,  _cx: &mut Context<'_>, 
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs index a1484c0..45862b8 100644 --- a/src/io/fill_buf.rs +++ b/src/io/fill_buf.rs 
@@ -3,6 +3,7 @@  use futures_io::AsyncBufRead;  use std::io;  use std::pin::Pin; +use std::slice;    /// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.  #[derive(Debug)] @@ -30,17 +31,12 @@  let reader = this.reader.take().expect("Polled FillBuf after completion");    match Pin::new(&mut *reader).poll_fill_buf(cx) { - // With polonius it is possible to remove this inner match and just have the correct - // lifetime of the reference inferred based on which branch is taken - Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { - Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), - Poll::Ready(Err(err)) => { - unreachable!("reader indicated readiness but then returned an error: {:?}", err) - } - Poll::Pending => { - unreachable!("reader indicated readiness but then returned pending") - } - }, + Poll::Ready(Ok(slice)) => { + // With polonius it is possible to remove this lifetime transmutation and just have + // the correct lifetime of the reference inferred based on which branch is taken + let slice: &'a [u8] = unsafe { slice::from_raw_parts(slice.as_ptr(), slice.len()) }; + Poll::Ready(Ok(slice)) + }  Poll::Ready(Err(err)) => Poll::Ready(Err(err)),  Poll::Pending => {  this.reader = Some(reader); 
diff --git a/src/io/mod.rs b/src/io/mod.rs index 8ce3ad6..fdad60b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs 
@@ -804,11 +804,11 @@  /// use futures::io::{AsyncBufReadExt, Cursor};  /// use futures::stream::StreamExt;  /// - /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); + /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");  /// - /// let mut lines_stream = cursor.lines().map(|l| l.unwrap()); + /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));  /// assert_eq!(lines_stream.next().await, Some(String::from("lorem"))); - /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum"))); + /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));  /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));  /// assert_eq!(lines_stream.next().await, None);  /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 
diff --git a/src/io/read_line.rs b/src/io/read_line.rs index e1b8fc9..df782c9 100644 --- a/src/io/read_line.rs +++ b/src/io/read_line.rs 
@@ -35,6 +35,7 @@  ) -> Poll<io::Result<usize>> {  let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));  if str::from_utf8(bytes).is_err() { + bytes.clear();  Poll::Ready(ret.and_then(|_| {  Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))  })) 
diff --git a/src/io/split.rs b/src/io/split.rs index 3f1b9af..81d1e6d 100644 --- a/src/io/split.rs +++ b/src/io/split.rs 
@@ -31,6 +31,13 @@  (ReadHalf { handle: a }, WriteHalf { handle: b })  }   +impl<T> ReadHalf<T> { + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool { + self.handle.is_pair_of(&other.handle) + } +} +  impl<T: Unpin> ReadHalf<T> {  /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back  /// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are @@ -42,6 +49,13 @@  }  }   +impl<T> WriteHalf<T> { + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool { + self.handle.is_pair_of(&other.handle) + } +} +  impl<T: Unpin> WriteHalf<T> {  /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back  /// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are 
diff --git a/src/io/window.rs b/src/io/window.rs index 77b7267..d857282 100644 --- a/src/io/window.rs +++ b/src/io/window.rs 
@@ -1,6 +1,6 @@  use std::ops::{Bound, Range, RangeBounds};   -/// A owned window around an underlying buffer. +/// An owned window around an underlying buffer.  ///  /// Normally slices work great for considering sub-portions of a buffer, but  /// unfortunately a slice is a *borrowed* type in Rust which has an associated 
diff --git a/src/lib.rs b/src/lib.rs index 9a10c93..208eb73 100644 --- a/src/lib.rs +++ b/src/lib.rs 
@@ -329,7 +329,7 @@  #[cfg(feature = "alloc")]  pub mod lock;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod abortable;   
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index 2174079..a89678e 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs 
@@ -3,11 +3,11 @@  use alloc::boxed::Box;  use alloc::sync::Arc;  use core::cell::UnsafeCell; -use core::fmt;  use core::ops::{Deref, DerefMut};  use core::pin::Pin; -use core::sync::atomic::AtomicUsize; +use core::sync::atomic::AtomicPtr;  use core::sync::atomic::Ordering::SeqCst; +use core::{fmt, ptr};  #[cfg(feature = "bilock")]  use futures_core::future::Future;  use futures_core::task::{Context, Poll, Waker}; @@ -41,7 +41,7 @@    #[derive(Debug)]  struct Inner<T> { - state: AtomicUsize, + state: AtomicPtr<Waker>,  value: Option<UnsafeCell<T>>,  }   @@ -61,7 +61,10 @@  /// Similarly, reuniting the lock and extracting the inner value is only  /// possible when `T` is `Unpin`.  pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); + let arc = Arc::new(Inner { + state: AtomicPtr::new(ptr::null_mut()), + value: Some(UnsafeCell::new(t)), + });    (Self { arc: arc.clone() }, Self { arc })  } @@ -87,7 +90,8 @@  pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {  let mut waker = None;  loop { - match self.arc.state.swap(1, SeqCst) { + let n = self.arc.state.swap(invalid_ptr(1), SeqCst); + match n as usize {  // Woohoo, we grabbed the lock!  0 => return Poll::Ready(BiLockGuard { bilock: self }),   @@ -96,8 +100,8 @@    // A task was previously blocked on this lock, likely our task,  // so we need to update that task. - n => unsafe { - let mut prev = Box::from_raw(n as *mut Waker); + _ => unsafe { + let mut prev = Box::from_raw(n);  *prev = cx.waker().clone();  waker = Some(prev);  }, @@ -105,9 +109,9 @@    // type ascription for safety's sake!  let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); - let me = Box::into_raw(me) as usize; + let me = Box::into_raw(me);   - match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { + match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {  // The lock is still locked, but we've now parked ourselves, so  // just report that we're scheduled to receive a notification.  Ok(_) => return Poll::Pending, @@ -115,8 +119,8 @@  // Oops, looks like the lock was unlocked after our swap above  // and before the compare_exchange. Deallocate what we just  // allocated and go through the loop again. - Err(0) => unsafe { - waker = Some(Box::from_raw(me as *mut Waker)); + Err(n) if n.is_null() => unsafe { + waker = Some(Box::from_raw(me));  },    // The top of this loop set the previous state to 1, so if we @@ -125,7 +129,7 @@  // but we're trying to acquire the lock and there's only one  // other reference of the lock, so it should be impossible for  // that task to ever block itself. - Err(n) => panic!("invalid state: {}", n), + Err(n) => panic!("invalid state: {}", n as usize),  }  }  } @@ -145,6 +149,11 @@  BiLockAcquire { bilock: self }  }   + /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`. + pub fn is_pair_of(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.arc, &other.arc) + } +  /// Attempts to put the two "halves" of a `BiLock<T>` back together and  /// recover the original value. Succeeds only if the two `BiLock<T>`s  /// originated from the same call to `BiLock::new`. @@ -152,7 +161,7 @@  where  T: Unpin,  { - if Arc::ptr_eq(&self.arc, &other.arc) { + if self.is_pair_of(&other) {  drop(other);  let inner = Arc::try_unwrap(self.arc)  .ok() @@ -164,7 +173,8 @@  }    fn unlock(&self) { - match self.arc.state.swap(0, SeqCst) { + let n = self.arc.state.swap(ptr::null_mut(), SeqCst); + match n as usize {  // we've locked the lock, shouldn't be possible for us to see an  // unlocked lock.  0 => panic!("invalid unlocked state"), @@ -174,8 +184,8 @@    // Another task has parked themselves on this lock, let's wake them  // up as its now their turn. - n => unsafe { - Box::from_raw(n as *mut Waker).wake(); + _ => unsafe { + Box::from_raw(n).wake();  },  }  } @@ -189,7 +199,7 @@    impl<T> Drop for Inner<T> {  fn drop(&mut self) { - assert_eq!(self.state.load(SeqCst), 0); + assert!(self.state.load(SeqCst).is_null());  }  }   @@ -277,3 +287,12 @@  self.bilock.poll_lock(cx)  }  } + +// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. +#[allow(clippy::useless_transmute)] +#[inline] +fn invalid_ptr<T>(addr: usize) -> *mut T { + // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that + // pointer). + unsafe { core::mem::transmute(addr) } +} 
diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 0be7271..8ca0ff6 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs 
@@ -3,25 +3,25 @@  //! This module is only available when the `std` or `alloc` feature of this  //! library is activated, and it is activated by default.   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(any(feature = "sink", feature = "io"))]  #[cfg(not(feature = "bilock"))]  pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "bilock")]  #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]  pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "std")]  pub use self::mutex::{  MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,  };   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]  #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]  #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]  mod bilock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "std")]  mod mutex; 
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index 618bf1b..2cc144e 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs 
@@ -19,7 +19,8 @@  struct OrderWrapper<T> {  #[pin]  data: T, // A future or a future's output - index: isize, + // Use i64 for index since isize may overflow in 32-bit targets. + index: i64,  }  }   @@ -58,36 +59,39 @@    /// An unbounded queue of futures.  /// -/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order -/// on top of the set of futures. While futures in the set will race to +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO +/// order on top of the set of futures. While futures in the set will race to  /// completion in parallel, results will only be returned in the order their  /// originating futures were added to the queue.  ///  /// Futures are pushed into this queue and their realized values are yielded in  /// order. This structure is optimized to manage a large number of futures. -/// Futures managed by `FuturesOrdered` will only be polled when they generate +/// Futures managed by [`FuturesOrdered`] will only be polled when they generate  /// notifications. This reduces the required amount of work needed to coordinate  /// large numbers of futures.  /// -/// When a `FuturesOrdered` is first created, it does not contain any futures. -/// Calling `poll` in this state will result in `Poll::Ready(None))` to be -/// returned. Futures are submitted to the queue using `push`; however, the -/// future will **not** be polled at this point. `FuturesOrdered` will only -/// poll managed futures when `FuturesOrdered::poll` is called. As such, it -/// is important to call `poll` after pushing new futures. +/// When a [`FuturesOrdered`] is first created, it does not contain any futures. +/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result +/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted +/// to the queue using [`push_back`](FuturesOrdered::push_back) (or +/// [`push_front`](FuturesOrdered::push_front)); however, the future will +/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed +/// futures when [`FuturesOrdered::poll_next`] is called. As such, it +/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing +/// new futures.  /// -/// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that -/// the queue is currently not managing any futures. A future may be submitted -/// to the queue at a later time. At that point, a call to -/// `FuturesOrdered::poll` will either return the future's resolved value -/// **or** `Poll::Pending` if the future has not yet completed. When -/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will -/// return `Poll::Pending` until the first future completes, even if +/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready) +/// this means that the queue is currently not managing any futures. A future +/// may be submitted to the queue at a later time. At that point, a call to +/// [`FuturesOrdered::poll_next`] will either return the future's resolved value +/// **or** [`Poll::Pending`] if the future has not yet completed. When +/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`] +/// will return [`Poll::Pending`] until the first future completes, even if  /// some of the later futures have already completed.  /// -/// Note that you can create a ready-made `FuturesOrdered` via the +/// Note that you can create a ready-made [`FuturesOrdered`] via the  /// [`collect`](Iterator::collect) method, or you can start with an empty queue -/// with the `FuturesOrdered::new` constructor. +/// with the [`FuturesOrdered::new`] constructor.  ///  /// This type is only available when the `std` or `alloc` feature of this  /// library is activated, and it is activated by default. @@ -95,8 +99,8 @@  pub struct FuturesOrdered<T: Future> {  in_progress_queue: FuturesUnordered<OrderWrapper<T>>,  queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, - next_incoming_index: isize, - next_outgoing_index: isize, + next_incoming_index: i64, + next_outgoing_index: i64,  }    impl<T: Future> Unpin for FuturesOrdered<T> {} @@ -104,8 +108,9 @@  impl<Fut: Future> FuturesOrdered<Fut> {  /// Constructs a new, empty `FuturesOrdered`  /// - /// The returned `FuturesOrdered` does not contain any futures and, in this - /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. + /// The returned [`FuturesOrdered`] does not contain any futures and, in + /// this state, [`FuturesOrdered::poll_next`] will return + /// [`Poll::Ready(None)`](Poll::Ready).  pub fn new() -> Self {  Self {  in_progress_queue: FuturesUnordered::new(), @@ -132,9 +137,9 @@  /// Push a future into the queue.  ///  /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications.  #[deprecated(note = "use `push_back` instead")]  pub fn push(&mut self, future: Fut) {  self.push_back(future); @@ -143,9 +148,9 @@  /// Pushes a future to the back of the queue.  ///  /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications.  pub fn push_back(&mut self, future: Fut) {  let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };  self.next_incoming_index += 1; @@ -155,10 +160,10 @@  /// Pushes a future to the front of the queue.  ///  /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. This future will be the next future to be returned - /// complete. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. This future will be + /// the next future to be returned complete.  pub fn push_front(&mut self, future: Fut) {  let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };  self.next_outgoing_index -= 1; 
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 6b5804d..dedf75d 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs 
@@ -62,7 +62,7 @@  }    unsafe impl<Fut: Send> Send for FuturesUnordered<Fut> {} -unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {} +unsafe impl<Fut: Send + Sync> Sync for FuturesUnordered<Fut> {}  impl<Fut> Unpin for FuturesUnordered<Fut> {}    impl Spawn for FuturesUnordered<FutureObj<'_, ()>> { @@ -558,20 +558,7 @@  impl<Fut> FuturesUnordered<Fut> {  /// Clears the set, removing all futures.  pub fn clear(&mut self) { - self.clear_head_all(); - - // we just cleared all the tasks, and we have &mut self, so this is safe. - unsafe { self.ready_to_run_queue.clear() }; - - self.is_terminated.store(false, Relaxed); - } - - fn clear_head_all(&mut self) { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); - } + *self = Self::new();  }  }   @@ -581,7 +568,11 @@  // associated with it. At the same time though there may be tons of  // wakers flying around which contain `Task<Fut>` references  // inside them. We'll let those naturally get deallocated. - self.clear_head_all(); + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + }    // Note that at this point we could still have a bunch of tasks in the  // ready to run queue. None of those tasks, however, have futures 
diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs index 4518705..a924935 100644 --- a/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/src/stream/futures_unordered/ready_to_run_queue.rs 
@@ -85,38 +85,25 @@  pub(super) fn stub(&self) -> *const Task<Fut> {  Arc::as_ptr(&self.stub)  } - - // Clear the queue of tasks. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. This method just pulls out - // tasks and drops their refcounts. - // - // # Safety - // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) - // - The caller **must** guarantee unique access to `self` - pub(crate) unsafe fn clear(&self) { - loop { - // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } - }  }    impl<Fut> Drop for ReadyToRunQueue<Fut> {  fn drop(&mut self) {  // Once we're in the destructor for `Inner<Fut>` we need to clear out  // the ready to run queue of tasks if there's anything left in there. - - // All tasks have had their futures dropped already by the `FuturesUnordered` - // destructor above, and we have &mut self, so this is safe. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. All tasks should have had + // their futures dropped already by the `FuturesUnordered` destructor + // above, so we're just pulling out tasks and dropping their refcounts.  unsafe { - self.clear(); + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + }  }  }  } 
diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ec685b9..2438e58 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs 
@@ -18,9 +18,10 @@  #[allow(clippy::module_inception)]  mod stream;  pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + Unzip, Zip,  };    #[cfg(feature = "std")] @@ -36,11 +37,13 @@  #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]  pub use self::stream::Forward;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; +pub use self::stream::{ + BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, +};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "sink")]  #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]  #[cfg(feature = "alloc")] @@ -48,9 +51,9 @@    mod try_stream;  pub use self::try_stream::{ - try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext, - TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, + try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, + TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, + TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,  };    #[cfg(feature = "io")] @@ -58,12 +61,14 @@  #[cfg(feature = "std")]  pub use self::try_stream::IntoAsyncRead;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +pub use self::try_stream::{ + TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent, +};    #[cfg(feature = "alloc")] -pub use self::try_stream::{TryChunks, TryChunksError}; +pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError};    // Primitive streams   @@ -100,36 +105,36 @@  mod unfold;  pub use self::unfold::{unfold, Unfold};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod futures_ordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use self::futures_ordered::FuturesOrdered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub mod futures_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[doc(inline)]  pub use self::futures_unordered::FuturesUnordered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub mod select_all; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[doc(inline)]  pub use self::select_all::{select_all, SelectAll};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use abortable::abortable;   
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index 3474331..121b6a0 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs 
@@ -8,29 +8,24 @@  use futures_core::stream::{FusedStream, Stream};  use futures_core::task::{Context, Poll};   -use pin_project_lite::pin_project; -  use super::assert_stream;  use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture};   -pin_project! { - /// An unbounded set of streams - /// - /// This "combinator" provides the ability to maintain a set of streams - /// and drive them all to completion. - /// - /// Streams are pushed into this set and their realized values are - /// yielded as they become ready. Streams will only be polled when they - /// generate notifications. This allows to coordinate a large number of streams. - /// - /// Note that you can create a ready-made `SelectAll` via the - /// `select_all` function in the `stream` module, or you can start with an - /// empty set with the `SelectAll::new` constructor. - #[must_use = "streams do nothing unless polled"] - pub struct SelectAll<St> { - #[pin] - inner: FuturesUnordered<StreamFuture<St>>, - } +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll<St> { + inner: FuturesUnordered<StreamFuture<St>>,  }    impl<St: Debug> Debug for SelectAll<St> { 
diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index ba2baa5..1435c79 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs 
@@ -13,7 +13,7 @@  #[pin]  stream: St,  f: F, - accum: Option<bool>, + done: bool,  #[pin]  future: Option<Fut>,  } @@ -27,7 +27,7 @@  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {  f.debug_struct("All")  .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done)  .field("future", &self.future)  .finish()  } @@ -40,7 +40,7 @@  Fut: Future<Output = bool>,  {  pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(true), future: None } + Self { stream, f, done: false, future: None }  }  }   @@ -51,7 +51,7 @@  Fut: Future<Output = bool>,  {  fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none()  }  }   @@ -67,21 +67,22 @@  let mut this = self.project();  Poll::Ready(loop {  if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); - if !acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if !res { + *this.done = true;  break false;  } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done {  // we're waiting on a new item from the stream  match ready!(this.stream.as_mut().poll_next(cx)) {  Some(item) => {  this.future.set(Some((this.f)(item)));  }  None => { - break this.accum.take().unwrap(); + *this.done = true; + break true;  }  }  } else { 
diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index f023125..cc3d695 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs 
@@ -13,7 +13,7 @@  #[pin]  stream: St,  f: F, - accum: Option<bool>, + done: bool,  #[pin]  future: Option<Fut>,  } @@ -27,7 +27,7 @@  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {  f.debug_struct("Any")  .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done)  .field("future", &self.future)  .finish()  } @@ -40,7 +40,7 @@  Fut: Future<Output = bool>,  {  pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(false), future: None } + Self { stream, f, done: false, future: None }  }  }   @@ -51,7 +51,7 @@  Fut: Future<Output = bool>,  {  fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none()  }  }   @@ -67,21 +67,22 @@  let mut this = self.project();  Poll::Ready(loop {  if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() || ready!(fut.poll(cx)); - if acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if res { + *this.done = true;  break true;  } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done {  // we're waiting on a new item from the stream  match ready!(this.stream.as_mut().poll_next(cx)) {  Some(item) => {  this.future.set(Some((this.f)(item)));  }  None => { - break this.accum.take().unwrap(); + *this.done = true; + break false;  }  }  } else { 
diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs index 07f971c..44c6ace 100644 --- a/src/stream/stream/flatten_unordered.rs +++ b/src/stream/stream/flatten_unordered.rs 
@@ -3,6 +3,7 @@  cell::UnsafeCell,  convert::identity,  fmt, + marker::PhantomData,  num::NonZeroUsize,  pin::Pin,  sync::atomic::{AtomicU8, Ordering}, @@ -22,8 +23,11 @@    use crate::stream::FuturesUnordered;   -/// There is nothing to poll and stream isn't being -/// polled or waking at the moment. +/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) +/// method. +pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>; + +/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.  const NONE: u8 = 0;    /// Inner streams need to be polled. @@ -32,26 +36,19 @@  /// The base stream needs to be polled.  const NEED_TO_POLL_STREAM: u8 = 0b10;   -/// It needs to poll base stream and inner streams. +/// Both base stream and inner streams need to be polled.  const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;    /// The current stream is being polled at the moment.  const POLLING: u8 = 0b100;   -/// Inner streams are being woken at the moment. -const WAKING_INNER_STREAMS: u8 = 0b1000; - -/// The base stream is being woken at the moment. -const WAKING_STREAM: u8 = 0b10000; - -/// The base stream and inner streams are being woken at the moment. -const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; +/// Stream is being woken at the moment. +const WAKING: u8 = 0b1000;    /// The stream was waked and will be polled. -const WOKEN: u8 = 0b100000; +const WOKEN: u8 = 0b10000;   -/// Determines what needs to be polled, and is stream being polled at the -/// moment or not. +/// Internal polling state of the stream.  #[derive(Clone, Debug)]  struct SharedPollState {  state: Arc<AtomicU8>, @@ -64,14 +61,14 @@  }    /// Attempts to start polling, returning stored state in case of success. - /// Returns `None` if some waker is waking at the moment. + /// Returns `None` if either waker is waking at the moment.  fn start_polling(  &self,  ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {  let value = self  .state  .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING_ALL == NONE { + if value & WAKING == NONE {  Some(POLLING)  } else {  None @@ -83,23 +80,20 @@  Some((value, bomb))  }   - /// Starts the waking process and performs bitwise or with the given value. + /// Attempts to start the waking process and performs bitwise or with the given value. + /// + /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however + /// state will be disjuncted with the given value.  fn start_waking(  &self,  to_poll: u8, - waking: u8,  ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {  let value = self  .state  .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - // Waking process for this waker already started - if value & waking != NONE { - return None; - }  let mut next_value = value | to_poll; - // Only start the waking process if we're not in the polling phase and the stream isn't woken already  if value & (WOKEN | POLLING) == NONE { - next_value |= waking; + next_value |= WAKING;  }    if next_value != value { @@ -110,8 +104,9 @@  })  .ok()?;   - if value & (WOKEN | POLLING) == NONE { - let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already + if value & (WOKEN | POLLING | WAKING) == NONE { + let bomb = PollStateBomb::new(self, SharedPollState::stop_waking);    Some((value, bomb))  } else { @@ -123,7 +118,7 @@  /// - `!POLLING` allowing to use wakers  /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,  /// or `will_be_woken` flag supplied - /// - `!WAKING_ALL` as + /// - `!WAKING` as  /// * Wakers called during the `POLLING` phase won't propagate their calls  /// * `POLLING` phase can't start if some of the wakers are active  /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. @@ -138,20 +133,17 @@  }  next_value |= value;   - Some(next_value & !POLLING & !WAKING_ALL) + Some(next_value & !POLLING & !WAKING)  })  .unwrap()  }    /// Toggles state to non-waking, allowing to start polling. - fn stop_waking(&self, waking: u8) -> u8 { - self.state + fn stop_waking(&self) -> u8 { + let value = self + .state  .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let mut next_value = value & !waking; - // Waker will be called only if the current waking state is the same as the specified waker state - if value & WAKING_ALL == waking { - next_value |= WOKEN; - } + let next_value = value & !WAKING | WOKEN;    if next_value != value {  Some(next_value) @@ -159,12 +151,15 @@  None  }  }) - .unwrap_or_else(identity) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value  }    /// Resets current state allowing to poll the stream and wake up wakers.  fn reset(&self) -> u8 { - self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel) + self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)  }  }   @@ -184,11 +179,6 @@  fn deactivate(mut self) {  self.drop.take();  } - - /// Manually fires the bomb, returning supplied state. - fn fire(mut self) -> Option<u8> { - self.drop.take().map(|drop| (drop)(self.state)) - }  }    impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> { @@ -201,16 +191,16 @@    /// Will update state with the provided value on `wake_by_ref` call  /// and then, if there is a need, call `inner_waker`. -struct InnerWaker { +struct WrappedWaker {  inner_waker: UnsafeCell<Option<Waker>>,  poll_state: SharedPollState,  need_to_poll: u8,  }   -unsafe impl Send for InnerWaker {} -unsafe impl Sync for InnerWaker {} +unsafe impl Send for WrappedWaker {} +unsafe impl Sync for WrappedWaker {}   -impl InnerWaker { +impl WrappedWaker {  /// Replaces given waker's inner_waker for polling stream/futures which will  /// update poll state on `wake_by_ref` call. Use only if you need several  /// contexts. @@ -218,25 +208,19 @@  /// ## Safety  ///  /// This function will modify waker's `inner_waker` via `UnsafeCell`, so - /// it should be used only during `POLLING` phase. - unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker { + /// it should be used only during `POLLING` phase by one thread at the time. + unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) {  *self_arc.inner_waker.get() = cx.waker().clone().into(); - waker(self_arc.clone())  }    /// Attempts to start the waking process for the waker with the given value.  /// If succeeded, then the stream isn't yet woken and not being polled at the moment.  fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { - self.poll_state.start_waking(self.need_to_poll, self.waking_state()) - } - - /// Returns the corresponding waking state toggled by this waker. - fn waking_state(&self) -> u8 { - self.need_to_poll << 3 + self.poll_state.start_waking(self.need_to_poll)  }  }   -impl ArcWake for InnerWaker { +impl ArcWake for WrappedWaker {  fn wake_by_ref(self_arc: &Arc<Self>) {  if let Some((_, state_bomb)) = self_arc.start_waking() {  // Safety: now state is not `POLLING` @@ -244,24 +228,17 @@    if let Some(inner_waker) = waker_opt.clone() {  // Stop waking to allow polling stream - let poll_state_value = state_bomb.fire().unwrap(); + drop(state_bomb);   - // Here we want to call waker only if stream isn't woken yet and - // also to optimize the case when two wakers are called at the same time. - // - // In this case the best strategy will be to propagate only the latest waker's awake, - // and then poll both entities in a single `poll_next` call - if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { - // Wake up inner waker - inner_waker.wake(); - } + // Wake up inner waker + inner_waker.wake();  }  }  }  }    pin_project! { - /// Future which contains optional stream. + /// Future which polls optional inner stream.  ///  /// If it's `Some`, it will attempt to call `poll_next` on it,  /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` @@ -303,10 +280,10 @@    pin_project! {  /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) - /// method. - #[project = FlattenUnorderedProj] + /// method with ability to specify flow controller. + #[project = FlattenUnorderedWithFlowControllerProj]  #[must_use = "streams do nothing unless polled"] - pub struct FlattenUnordered<St> where St: Stream { + pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {  #[pin]  inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,  #[pin] @@ -314,80 +291,110 @@  poll_state: SharedPollState,  limit: Option<NonZeroUsize>,  is_stream_done: bool, - inner_streams_waker: Arc<InnerWaker>, - stream_waker: Arc<InnerWaker>, + inner_streams_waker: Arc<WrappedWaker>, + stream_waker: Arc<WrappedWaker>, + flow_controller: PhantomData<Fc>  }  }   -impl<St> fmt::Debug for FlattenUnordered<St> +impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>  where  St: Stream + fmt::Debug,  St::Item: Stream + fmt::Debug,  {  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FlattenUnordered") + f.debug_struct("FlattenUnorderedWithFlowController")  .field("poll_state", &self.poll_state)  .field("inner_streams", &self.inner_streams)  .field("limit", &self.limit)  .field("stream", &self.stream)  .field("is_stream_done", &self.is_stream_done) + .field("flow_controller", &self.flow_controller)  .finish()  }  }   -impl<St> FlattenUnordered<St> +impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>  where  St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>,  St::Item: Stream + Unpin,  { - pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> { + pub(crate) fn new( + stream: St, + limit: Option<usize>, + ) -> FlattenUnorderedWithFlowController<St, Fc> {  let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);   - FlattenUnordered { + FlattenUnorderedWithFlowController {  inner_streams: FuturesUnordered::new(),  stream,  is_stream_done: false,  limit: limit.and_then(NonZeroUsize::new), - inner_streams_waker: Arc::new(InnerWaker { + inner_streams_waker: Arc::new(WrappedWaker {  inner_waker: UnsafeCell::new(None),  poll_state: poll_state.clone(),  need_to_poll: NEED_TO_POLL_INNER_STREAMS,  }), - stream_waker: Arc::new(InnerWaker { + stream_waker: Arc::new(WrappedWaker {  inner_waker: UnsafeCell::new(None),  poll_state: poll_state.clone(),  need_to_poll: NEED_TO_POLL_STREAM,  }),  poll_state, + flow_controller: PhantomData,  }  }    delegate_access_inner!(stream, St, ());  }   -impl<St> FlattenUnorderedProj<'_, St> +/// Returns the next flow step based on the received item. +pub trait FlowController<I, O> { + /// Handles an item producing `FlowStep` describing the next flow step. + fn next_step(item: I) -> FlowStep<I, O>; +} + +impl<I, O> FlowController<I, O> for () { + fn next_step(item: I) -> FlowStep<I, O> { + FlowStep::Continue(item) + } +} + +/// Describes the next flow step. +#[derive(Debug, Clone)] +pub enum FlowStep<C, R> { + /// Just yields an item and continues standard flow. + Continue(C), + /// Immediately returns an underlying item from the function. + Return(R), +} + +impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>  where  St: Stream,  { - /// Checks if current `inner_streams` size is less than optional limit. + /// Checks if current `inner_streams` bucket size is greater than optional limit.  fn is_exceeded_limit(&self) -> bool {  self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())  }  }   -impl<St> FusedStream for FlattenUnordered<St> +impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>  where  St: FusedStream, - St::Item: FusedStream + Unpin, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin,  {  fn is_terminated(&self) -> bool {  self.stream.is_terminated() && self.inner_streams.is_empty()  }  }   -impl<St> Stream for FlattenUnordered<St> +impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>  where  St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>,  St::Item: Stream + Unpin,  {  type Item = <St::Item as Stream>::Item; @@ -398,17 +405,21 @@    let mut this = self.as_mut().project();   - let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() { - Some(value) => value, - _ => { - // Waker was called, just wait for the next poll - return Poll::Pending; + // Attempt to start polling, in case some waker is holding the lock, wait in loop + let (mut poll_state_value, state_bomb) = loop { + if let Some(value) = this.poll_state.start_polling() { + break value;  }  };   + // Safety: now state is `POLLING`. + unsafe { + WrappedWaker::replace_waker(this.stream_waker, cx); + WrappedWaker::replace_waker(this.inner_streams_waker, cx) + }; +  if poll_state_value & NEED_TO_POLL_STREAM != NONE { - // Safety: now state is `POLLING`. - let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) }; + let mut stream_waker = None;    // Here we need to poll the base stream.  // @@ -424,15 +435,35 @@    break;  } else { - match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) { - Poll::Ready(Some(inner_stream)) => { + let mut cx = Context::from_waker( + stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())), + ); + + match this.stream.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + let next_item_fut = match Fc::next_step(item) { + // Propagates an item immediately (the main use-case is for errors) + FlowStep::Return(item) => { + need_to_poll_next |= NEED_TO_POLL_STREAM + | (poll_state_value & NEED_TO_POLL_INNER_STREAMS); + poll_state_value &= !NEED_TO_POLL_INNER_STREAMS; + + next_item = Some(item); + + break; + } + // Yields an item and continues processing (normal case) + FlowStep::Continue(inner_stream) => { + PollStreamFut::new(inner_stream) + } + };  // Add new stream to the inner streams bucket - this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream)); + this.inner_streams.as_mut().push(next_item_fut);  // Inner streams must be polled afterward  poll_state_value |= NEED_TO_POLL_INNER_STREAMS;  }  Poll::Ready(None) => { - // Mark the stream as done + // Mark the base stream as done  *this.is_stream_done = true;  }  Poll::Pending => { @@ -444,15 +475,10 @@  }    if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { - // Safety: now state is `POLLING`. - let inner_streams_waker = - unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) }; + let inner_streams_waker = waker(this.inner_streams_waker.clone()); + let mut cx = Context::from_waker(&inner_streams_waker);   - match this - .inner_streams - .as_mut() - .poll_next(&mut Context::from_waker(&inner_streams_waker)) - { + match this.inner_streams.as_mut().poll_next(&mut cx) {  Poll::Ready(Some(Some((item, next_item_fut)))) => {  // Push next inner stream item future to the list of inner streams futures  this.inner_streams.as_mut().push(next_item_fut); @@ -472,15 +498,16 @@  // We didn't have any `poll_next` panic, so it's time to deactivate the bomb  state_bomb.deactivate();   + // Call the waker at the end of polling if  let mut force_wake =  // we need to poll the stream and didn't reach the limit yet  need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() - // or we need to poll inner streams again + // or we need to poll the inner streams again  || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;    // Stop polling and swap the latest state  poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); - // If state was changed during `POLLING` phase, need to manually call a waker + // If state was changed during `POLLING` phase, we also need to manually call a waker  force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;    let is_done = *this.is_stream_done && this.inner_streams.is_empty(); @@ -499,7 +526,7 @@    // Forwarding impl of Sink from the underlying stream  #[cfg(feature = "sink")] -impl<St, Item> Sink<Item> for FlattenUnordered<St> +impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>  where  St: Stream + Sink<Item>,  { 
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index bb5e249..2da7036 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs 
@@ -181,32 +181,32 @@  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::scan::Scan;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::buffer_unordered::BufferUnordered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::buffered::Buffered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")] -mod flatten_unordered; +pub(crate) mod flatten_unordered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)]  pub use self::flatten_unordered::FlattenUnordered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  delegate_all!(  /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. @@ -216,20 +216,20 @@  where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U  );   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::for_each_concurrent::ForEachConcurrent;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "sink")]  #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]  #[cfg(feature = "alloc")]  mod split; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "sink")]  #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]  #[cfg(feature = "alloc")] @@ -323,6 +323,9 @@  /// wrapped version of it, similar to the existing `map` methods in the  /// standard library.  /// + /// See [`StreamExt::then`](Self::then) if you want to use a closure that + /// returns a future instead of a value. + ///  /// # Examples  ///  /// ``` @@ -467,6 +470,9 @@  /// Note that this function consumes the stream passed into it and returns a  /// wrapped version of it.  /// + /// See [`StreamExt::map`](Self::map) if you want to use a closure that + /// returns a value instead of a future. + ///  /// # Examples  ///  /// ``` @@ -774,7 +780,14 @@  }    /// Flattens a stream of streams into just one continuous stream. Polls - /// inner streams concurrently. + /// inner streams produced by the base stream concurrently. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`.  ///  /// # Examples  /// @@ -807,14 +820,14 @@  /// assert_eq!(output, vec![1, 2, 3, 4]);  /// # });  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>  where  Self::Item: Stream + Unpin,  Self: Sized,  { - FlattenUnordered::new(self, limit.into()) + assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))  }    /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. @@ -863,7 +876,7 @@  ///  /// The first argument is an optional limit on the number of concurrently  /// polled streams. If this limit is not `None`, no more than `limit` streams - /// will be polled concurrently. The `limit` argument is of type + /// will be polled at the same time. The `limit` argument is of type  /// `Into<Option<usize>>`, and so can be provided as either `None`,  /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as  /// no limit at all, and will have the same result as passing in `None`. @@ -889,7 +902,7 @@  /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);  /// # });  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn flat_map_unordered<U, F>(  self, @@ -901,7 +914,7 @@  F: FnMut(Self::Item) -> U,  Self: Sized,  { - FlatMapUnordered::new(self, limit.into(), f) + assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))  }    /// Combinator similar to [`StreamExt::fold`] that holds internal state @@ -1129,7 +1142,7 @@  /// fut.await;  /// # })  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn for_each_concurrent<Fut, F>(  self, @@ -1352,7 +1365,7 @@  ///  /// This method is only available when the `std` or `alloc` feature of this  /// library is activated, and it is activated by default. - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn buffered(self, n: usize) -> Buffered<Self>  where @@ -1397,7 +1410,7 @@  /// assert_eq!(buffered.next().await, None);  /// # Ok::<(), i32>(()) }).unwrap();  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>  where @@ -1564,7 +1577,7 @@  /// library is activated, and it is activated by default.  #[cfg(feature = "sink")]  #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)  where 
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index e2034e0..1a7fdcb 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs 
@@ -15,6 +15,13 @@    impl<S> Unpin for SplitStream<S> {}   +impl<S> SplitStream<S> { + /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`. + pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool { + other.is_pair_of(&self) + } +} +  impl<S: Unpin> SplitStream<S> {  /// Attempts to put the two "halves" of a split `Stream + Sink` back  /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are @@ -60,6 +67,13 @@  }  }   +impl<S, Item> SplitSink<S, Item> { + /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool { + self.lock.is_pair_of(&other.0) + } +} +  impl<S: Sink<Item>, Item> SplitSink<S, Item> {  fn poll_flush_slot(  mut inner: Pin<&mut S>, @@ -142,3 +156,69 @@    #[cfg(feature = "std")]  impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{sink::Sink, stream::StreamExt}; + use core::marker::PhantomData; + + struct NopStream<Item> { + phantom: PhantomData<Item>, + } + + impl<Item> Stream for NopStream<Item> { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + todo!() + } + } + + impl<Item> Sink<Item> for NopStream<Item> { + type Error = (); + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + todo!() + } + + fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + todo!() + } + } + + #[test] + fn test_pairing() { + let s1 = NopStream::<()> { phantom: PhantomData }; + let (sink1, stream1) = s1.split(); + assert!(sink1.is_pair_of(&stream1)); + assert!(stream1.is_pair_of(&sink1)); + + let s2 = NopStream::<()> { phantom: PhantomData }; + let (sink2, stream2) = s2.split(); + assert!(sink2.is_pair_of(&stream2)); + assert!(stream2.is_pair_of(&sink2)); + + assert!(!sink1.is_pair_of(&stream2)); + assert!(!stream1.is_pair_of(&sink2)); + assert!(!sink2.is_pair_of(&stream1)); + assert!(!stream2.is_pair_of(&sink1)); + } +} 
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index bc4c6e4..7b55444 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs 
@@ -15,6 +15,7 @@  #[cfg(feature = "alloc")]  use alloc::vec::Vec;  use core::pin::Pin; +  use futures_core::{  future::{Future, TryFuture},  stream::TryStream, @@ -88,6 +89,14 @@  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_flatten::TryFlatten;   +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +mod try_flatten_unordered; +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_unordered::TryFlattenUnordered; +  mod try_collect;  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_collect::TryCollect; @@ -102,6 +111,12 @@  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_chunks::{TryChunks, TryChunksError};   +#[cfg(feature = "alloc")] +mod try_ready_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError}; +  mod try_fold;  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_fold::TryFold; @@ -118,26 +133,26 @@  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_take_while::TryTakeWhile;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod try_buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_buffer_unordered::TryBufferUnordered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod try_buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_buffered::TryBuffered;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  mod try_for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::try_for_each_concurrent::TryForEachConcurrent; @@ -151,6 +166,14 @@  #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411  pub use self::into_async_read::IntoAsyncRead;   +mod try_all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_all::TryAll; + +mod try_any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_any::TryAny; +  impl<S: ?Sized + TryStream> TryStreamExt for S {}    /// Adapters specific to `Result`-returning streams @@ -528,7 +551,7 @@  /// assert_eq!(Err(oneshot::Canceled), fut.await);  /// # })  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn try_for_each_concurrent<Fut, F>(  self, @@ -631,6 +654,55 @@  )  }   + /// An adaptor for chunking up successful, ready items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. If the underlying stream + /// returns `Poll::Pending`, and the collected chunk is not empty, it will + /// be immidiatly returned. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryReadyChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_ready_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self> + where + Self: Sized, + { + assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>( + TryReadyChunks::new(self, capacity), + ) + } +  /// Attempt to filter the values produced by this stream according to the  /// provided asynchronous closure.  /// @@ -711,6 +783,63 @@  assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))  }   + /// Flattens a stream of streams into just one continuous stream. Produced streams + /// will be polled concurrently and any errors will be passed through without looking at them. + /// If the underlying base stream returns an error, it will be **immediately** propagated. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); + /// }); + /// + /// let stream = rx3.try_flatten_unordered(None); + /// let mut values: Vec<_> = stream.collect().await; + /// values.sort(); + /// + /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); + /// # }); + /// ``` + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> + where + Self::Ok: TryStream + Unpin, + <Self::Ok as TryStream>::Error: From<Self::Error>, + Self: Sized, + { + assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( + TryFlattenUnordered::new(self, limit), + ) + } +  /// Flattens a stream of streams into just one continuous stream.  ///  /// If this stream's elements are themselves streams then this combinator @@ -900,7 +1029,7 @@  /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));  /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>  where @@ -976,7 +1105,7 @@  /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));  /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();  /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  fn try_buffered(self, n: usize) -> TryBuffered<Self>  where @@ -1061,4 +1190,62 @@  {  crate::io::assert_read(IntoAsyncRead::new(self))  } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that does not satisfy the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>); + /// let positive = number_stream.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let positive = stream_with_errors.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Err("err")); + /// # }); + /// ``` + fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F> + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future<Output = bool>, + { + assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f)) + } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that satisfies the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>); + /// let contain_three = number_stream.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Err("err")); + /// # }); + /// ``` + fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F> + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future<Output = bool>, + { + assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f)) + }  } 
diff --git a/src/stream/try_stream/try_all.rs b/src/stream/try_stream/try_all.rs new file mode 100644 index 0000000..8179f86 --- /dev/null +++ b/src/stream/try_stream/try_all.rs 
@@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_all`](super::TryStreamExt::try_all) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAll<St, Fut, F> { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option<Fut>, + } +} + +impl<St, Fut, F> fmt::Debug for TryAll<St, Fut, F> +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAll") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl<St, Fut, F> TryAll<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl<St, Fut, F> FusedFuture for TryAll<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl<St, Fut, F> Future for TryAll<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + type Output = Result<bool, St::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if !acc { + *this.done = true; + break Ok(false); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(true); + } + } + } else { + panic!("TryAll polled after completion") + } + }) + } +} 
diff --git a/src/stream/try_stream/try_any.rs b/src/stream/try_stream/try_any.rs new file mode 100644 index 0000000..55e876b --- /dev/null +++ b/src/stream/try_stream/try_any.rs 
@@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_any`](super::TryStreamExt::try_any) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAny<St, Fut, F> { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option<Fut>, + } +} + +impl<St, Fut, F> fmt::Debug for TryAny<St, Fut, F> +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAny") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl<St, Fut, F> TryAny<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl<St, Fut, F> FusedFuture for TryAny<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl<St, Fut, F> Future for TryAny<St, Fut, F> +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = bool>, +{ + type Output = Result<bool, St::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if acc { + *this.done = true; + break Ok(true); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(false); + } + } + } else { + panic!("TryAny polled after completion") + } + }) + } +} 
diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs index 3bb253a..ec53f4b 100644 --- a/src/stream/try_stream/try_chunks.rs +++ b/src/stream/try_stream/try_chunks.rs 
@@ -41,9 +41,10 @@  delegate_access_inner!(stream, St, (. .));  }   +type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; +  impl<St: TryStream> Stream for TryChunks<St> { - #[allow(clippy::type_complexity)] - type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>; + type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {  let mut this = self.as_mut().project(); 
diff --git a/src/stream/try_stream/try_flatten_unordered.rs b/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 0000000..a74dfc4 --- /dev/null +++ b/src/stream/try_stream/try_flatten_unordered.rs 
@@ -0,0 +1,176 @@ +use core::marker::PhantomData; +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::flatten_unordered::{ + FlattenUnorderedWithFlowController, FlowController, FlowStep, +}; +use crate::stream::IntoStream; +use crate::TryStreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered<St>( + FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into<Option<usize>>| + FlattenUnorderedWithFlowController::new( + NestedTryStreamIntoEitherTryStream::new(stream), + limit.into() + ) + ] + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> +); + +pin_project! { + /// Emits either successful streams or single-item streams containing the underlying errors. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct NestedTryStreamIntoEitherTryStream<St> + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> + { + #[pin] + stream: St + } +} + +impl<St> NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Emits a single item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct Single<T>(Option<T>); + +impl<T> Single<T> { + /// Constructs new `Single` with the given value. + fn new(val: T) -> Self { + Self(Some(val)) + } + + /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. + fn next_immediate(&mut self) -> Option<T> { + self.0.take() + } +} + +impl<T> Unpin for Single<T> {} + +impl<T> Stream for Single<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +/// Immediately propagates errors occurred in the base stream. +#[derive(Debug, Clone, Copy)] +pub struct PropagateBaseStreamError<St>(PhantomData<St>); + +type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; +type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; + +impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { + match item { + // A new successful inner stream received + st @ Either::Left(_) => FlowStep::Continue(st), + // An error encountered + Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), + } + } +} + +type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; + +impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = match item { + Some(res) => match res { + // Emit successful inner stream as is + Ok(stream) => Either::Left(stream.into_stream()), + // Wrap an error into a stream containing a single item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(Single::new(res)) + } + }, + None => return Poll::Ready(None), + }; + + Poll::Ready(Some(out)) + } +} + +impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + FusedStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + Sink<Item>, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, +{ + type Error = <St as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} 
diff --git a/src/stream/try_stream/try_ready_chunks.rs b/src/stream/try_stream/try_ready_chunks.rs new file mode 100644 index 0000000..8b1470e --- /dev/null +++ b/src/stream/try_stream/try_ready_chunks.rs 
@@ -0,0 +1,126 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::fmt; +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryReadyChunks<St: TryStream> { + #[pin] + stream: Fuse<IntoStream<St>>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<St: TryStream> TryReadyChunks<St> { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + } + + delegate_access_inner!(stream, St, (. .)); +} + +type TryReadyChunksStreamError<St> = + TryReadyChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; + +impl<St: TryStream> Stream for TryReadyChunks<St> { + type Item = Result<Vec<St::Ok>, TryReadyChunksStreamError<St>>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.as_mut().project(); + + let mut items: Vec<St::Ok> = Vec::new(); + + loop { + match this.stream.as_mut().poll_next(cx) { + // Flush all the collected data if the underlying stream doesn't + // contain more ready values + Poll::Pending => { + return if items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(Ok(items))) + } + } + + // Push the ready item into the buffer and check whether it is full. + // If so, return the buffer. + Poll::Ready(Some(Ok(item))) => { + if items.is_empty() { + items.reserve_exact(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(Ok(items))); + } + } + + // Return the already collected items and the error. + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(TryReadyChunksError(items, e)))); + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if items.is_empty() { None } else { Some(Ok(items)) }; + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + let lower = lower / self.cap; + (lower, upper) + } +} + +impl<St: TryStream + FusedStream> FusedStream for TryReadyChunks<St> { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Item> Sink<Item> for TryReadyChunks<S> +where + S: TryStream + Sink<Item>, +{ + type Error = <S as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occurred, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryReadyChunksError<T, E>(pub Vec<T>, pub E); + +impl<T, E: fmt::Debug> fmt::Debug for TryReadyChunksError<T, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl<T, E: fmt::Display> fmt::Display for TryReadyChunksError<T, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryReadyChunksError<T, E> {} 
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 7d8ef6b..2f48ccc 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs 
@@ -36,7 +36,7 @@  /// let stream = stream::unfold(0, |state| async move {  /// if state <= 2 {  /// let next_state = state + 1; -/// let yielded = state * 2; +/// let yielded = state * 2;  /// Some((yielded, next_state))  /// } else {  /// None 
diff --git a/src/task/mod.rs b/src/task/mod.rs index 0a31eea..7a9e993 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs 
@@ -18,19 +18,22 @@  pub use futures_task::noop_waker;  pub use futures_task::noop_waker_ref;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use futures_task::ArcWake;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use futures_task::waker;   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]  #[cfg(feature = "alloc")]  pub use futures_task::{waker_ref, WakerRef};   -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr( + target_os = "none", + cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")) +)]  pub use futures_core::task::__internal::AtomicWaker;    mod spawn;